/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.deploy.master

import java.text.SimpleDateFormat
import java.util.concurrent.{ScheduledFuture, TimeUnit}
import java.util.{Date, Locale}

import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.deploy.rest.StandaloneRestServer
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState, SparkHadoopUtil}
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
import org.apache.spark.serializer.{JavaSerializer, Serializer}
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.{SecurityManager, SparkConf, SparkException}

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.util.Random

private[deploy] class Master(
                                override val rpcEnv: RpcEnv,
                                address: RpcAddress,
                                webUiPort: Int,
                                val securityMgr: SecurityManager,
                                val conf: SparkConf)
    extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {

    private val forwardMessageThread =
        ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")

    private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)

    // For application IDs
    private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)

    // worker 超时时间
    private val WORKER_TIMEOUT_MS = conf.getLong("spark.worker.timeout", 60) * 1000
    private val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200)
    private val RETAINED_DRIVERS = conf.getInt("spark.deploy.retainedDrivers", 200)
    // worket 死亡迭代次数
    private val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15)
    private val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")
    private val MAX_EXECUTOR_RETRIES = conf.getInt("spark.deploy.maxExecutorRetries", 10)

    val workers = new HashSet[WorkerInfo]
    val idToApp = new HashMap[String, ApplicationInfo]
    private val waitingApps = new ArrayBuffer[ApplicationInfo]
    val apps = new HashSet[ApplicationInfo]

    private val idToWorker = new HashMap[String, WorkerInfo]
    private val addressToWorker = new HashMap[RpcAddress, WorkerInfo]

    private val endpointToApp = new HashMap[RpcEndpointRef, ApplicationInfo]
    private val addressToApp = new HashMap[RpcAddress, ApplicationInfo]
    private val completedApps = new ArrayBuffer[ApplicationInfo]
    private var nextAppNumber = 0

    private val drivers = new HashSet[DriverInfo]
    private val completedDrivers = new ArrayBuffer[DriverInfo]
    // Drivers currently spooled for scheduling
    private val waitingDrivers = new ArrayBuffer[DriverInfo]
    private var nextDriverNumber = 0

    Utils.checkHost(address.host, "Expected hostname")

    private val masterMetricsSystem = MetricsSystem.createMetricsSystem("master", conf, securityMgr)
    private val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications", conf,
        securityMgr)
    private val masterSource = new MasterSource(this)

    // After onStart, webUi will be set
    private var webUi: MasterWebUI = null

    private val masterPublicAddress = {
        val envVar = conf.getenv("SPARK_PUBLIC_DNS")
        if (envVar != null) envVar else address.host
    }

    private val masterUrl = address.toSparkURL
    private var masterWebUiUrl: String = _

    private var state = RecoveryState.STANDBY

    private var persistenceEngine: PersistenceEngine = _

    private var leaderElectionAgent: LeaderElectionAgent = _

    private var recoveryCompletionTask: ScheduledFuture[_] = _

    private var checkForWorkerTimeOutTask: ScheduledFuture[_] = _

    // As a temporary workaround before better ways of configuring memory, we allow users to set
    // a flag that will perform round-robin scheduling across the nodes (spreading out each app
    // among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
    private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)

    // Default maxCores for applications that don't specify it (i.e. pass Int.MaxValue)
    private val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue)
    val reverseProxy = conf.getBoolean("spark.ui.reverseProxy", false)
    if (defaultCores < 1) {
        throw new SparkException("spark.deploy.defaultCores must be positive")
    }

    // Alternative application submission gateway that is stable across Spark versions
    private val restServerEnabled = conf.getBoolean("spark.master.rest.enabled", true)
    private var restServer: Option[StandaloneRestServer] = None
    private var restServerBoundPort: Option[Int] = None

    override def onStart(): Unit = {
        logInfo("Starting Spark master at " + masterUrl)
        logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
        // 创建 WebUI 服务器
        webUi = new MasterWebUI(this, webUiPort)
        webUi.bind()
        masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
        if (reverseProxy) {
            masterWebUiUrl = conf.get("spark.ui.reverseProxyUrl", masterWebUiUrl)
            logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers and " +
                s"Applications UIs are available at $masterWebUiUrl")
        }
        // 按照固定的频率去启动线程来检查 Worker 是否超时. 其实就是给自己发信息: CheckForWorkerTimeOut
        // 默认是每分钟检查一次.
        checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
            override def run(): Unit = Utils.tryLogNonFatalError {
                self.send(CheckForWorkerTimeOut)
            }
        }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)

        if (restServerEnabled) {
            val port = conf.getInt("spark.master.rest.port", 6066)
            restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl))
        }

        restServerBoundPort = restServer.map(_.start())

        masterMetricsSystem.registerSource(masterSource)
        masterMetricsSystem.start()
        applicationMetricsSystem.start()
        // Attach the master and app metrics servlet handler to the web ui after the metrics systems are
        // started.
        masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
        applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)

        val serializer = new JavaSerializer(conf)
        val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
            case "ZOOKEEPER" =>
                logInfo("Persisting recovery state to ZooKeeper")
                val zkFactory =
                    new ZooKeeperRecoveryModeFactory(conf, serializer)
                (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
            case "FILESYSTEM" =>
                val fsFactory =
                    new FileSystemRecoveryModeFactory(conf, serializer)
                (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
            case "CUSTOM" =>
                val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
                val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
                    .newInstance(conf, serializer)
                    .asInstanceOf[StandaloneRecoveryModeFactory]
                (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
            case _ =>
                (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
        }
        persistenceEngine = persistenceEngine_
        leaderElectionAgent = leaderElectionAgent_
    }

    override def onStop() {
        masterMetricsSystem.report()
        applicationMetricsSystem.report()
        // prevent the CompleteRecovery message sending to restarted master
        if (recoveryCompletionTask != null) {
            recoveryCompletionTask.cancel(true)
        }
        if (checkForWorkerTimeOutTask != null) {
            checkForWorkerTimeOutTask.cancel(true)
        }
        forwardMessageThread.shutdownNow()
        webUi.stop()
        restServer.foreach(_.stop())
        masterMetricsSystem.stop()
        applicationMetricsSystem.stop()
        persistenceEngine.close()
        leaderElectionAgent.stop()
    }

    override def electedLeader() {
        self.send(ElectedLeader)
    }

    override def revokedLeadership() {
        self.send(RevokedLeadership)
    }

    override def receive: PartialFunction[Any, Unit] = {
        case ElectedLeader =>
            val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
            state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
                RecoveryState.ALIVE
            } else {
                RecoveryState.RECOVERING
            }
            logInfo("I have been elected leader! New state: " + state)
            if (state == RecoveryState.RECOVERING) {
                beginRecovery(storedApps, storedDrivers, storedWorkers)
                recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
                    override def run(): Unit = Utils.tryLogNonFatalError {
                        self.send(CompleteRecovery)
                    }
                }, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
            }

        case CompleteRecovery => completeRecovery()

        case RevokedLeadership =>
            logError("Leadership has been revoked -- master shutting down.")
            System.exit(0)

        case RegisterApplication(description, driver) =>
            // TODO Prevent repeated registrations from some driver
            if (state == RecoveryState.STANDBY) {
                // ignore, don't send response
            } else {
                logInfo("Registering app " + description.name)
                val app = createApplication(description, driver)
                registerApplication(app)
                logInfo("Registered app " + description.name + " with ID " + app.id)
                persistenceEngine.addApplication(app)
                driver.send(RegisteredApplication(app.id, self))
                schedule()
            }

        case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
            val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
            execOption match {
                case Some(exec) =>
                    val appInfo = idToApp(appId)
                    val oldState = exec.state
                    exec.state = state

                    if (state == ExecutorState.RUNNING) {
                        assert(oldState == ExecutorState.LAUNCHING,
                            s"executor $execId state transfer from $oldState to RUNNING is illegal")
                        appInfo.resetRetryCount()
                    }

                    exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false))

                    if (ExecutorState.isFinished(state)) {
                        // Remove this executor from the worker and app
                        logInfo(s"Removing executor ${exec.fullId} because it is $state")
                        // If an application has already finished, preserve its
                        // state to display its information properly on the UI
                        if (!appInfo.isFinished) {
                            appInfo.removeExecutor(exec)
                        }
                        exec.worker.removeExecutor(exec)

                        val normalExit = exitStatus == Some(0)
                        // Only retry certain number of times so we don't go into an infinite loop.
                        // Important note: this code path is not exercised by tests, so be very careful when
                        // changing this `if` condition.
                        if (!normalExit
                            && appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES
                            && MAX_EXECUTOR_RETRIES >= 0) { // < 0 disables this application-killing path
                            val execs = appInfo.executors.values
                            if (!execs.exists(_.state == ExecutorState.RUNNING)) {
                                logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
                                    s"${appInfo.retryCount} times; removing it")
                                removeApplication(appInfo, ApplicationState.FAILED)
                            }
                        }
                    }
                    schedule()
                case None =>
                    logWarning(s"Got status update for unknown executor $appId/$execId")
            }

        case DriverStateChanged(driverId, state, exception) =>
            state match {
                case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
                    removeDriver(driverId, state, exception)
                case _ =>
                    throw new Exception(s"Received unexpected state update for driver $driverId: $state")
            }

        case Heartbeat(workerId, worker) =>
            idToWorker.get(workerId) match {
                case Some(workerInfo) =>
                    // 记录该 Worker 的最新心跳
                    workerInfo.lastHeartbeat = System.currentTimeMillis()
                case None =>
                    if (workers.map(_.id).contains(workerId)) {
                        logWarning(s"Got heartbeat from unregistered worker $workerId." +
                            " Asking it to re-register.")
                        worker.send(ReconnectWorker(masterUrl))
                    } else {
                        logWarning(s"Got heartbeat from unregistered worker $workerId." +
                            " This worker was never registered, so ignoring the heartbeat.")
                    }
            }

        case MasterChangeAcknowledged(appId) =>
            idToApp.get(appId) match {
                case Some(app) =>
                    logInfo("Application has been re-registered: " + appId)
                    app.state = ApplicationState.WAITING
                case None =>
                    logWarning("Master change ack from unknown app: " + appId)
            }

            if (canCompleteRecovery) {
                completeRecovery()
            }

        case WorkerSchedulerStateResponse(workerId, executors, driverIds) =>
            idToWorker.get(workerId) match {
                case Some(worker) =>
                    logInfo("Worker has been re-registered: " + workerId)
                    worker.state = WorkerState.ALIVE

                    val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)
                    for (exec <- validExecutors) {
                        val app = idToApp.get(exec.appId).get
                        val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))
                        worker.addExecutor(execInfo)
                        execInfo.copyState(exec)
                    }

                    for (driverId <- driverIds) {
                        drivers.find(_.id == driverId).foreach { driver =>
                            driver.worker = Some(worker)
                            driver.state = DriverState.RUNNING
                            worker.drivers(driverId) = driver
                        }
                    }
                case None =>
                    logWarning("Scheduler state from unknown worker: " + workerId)
            }

            if (canCompleteRecovery) {
                completeRecovery()
            }

        case WorkerLatestState(workerId, executors, driverIds) =>
            idToWorker.get(workerId) match {
                case Some(worker) =>
                    for (exec <- executors) {
                        val executorMatches = worker.executors.exists {
                            case (_, e) => e.application.id == exec.appId && e.id == exec.execId
                        }
                        if (!executorMatches) {
                            // master doesn't recognize this executor. So just tell worker to kill it.
                            worker.endpoint.send(KillExecutor(masterUrl, exec.appId, exec.execId))
                        }
                    }

                    for (driverId <- driverIds) {
                        val driverMatches = worker.drivers.exists { case (id, _) => id == driverId }
                        if (!driverMatches) {
                            // master doesn't recognize this driver. So just tell worker to kill it.
                            worker.endpoint.send(KillDriver(driverId))
                        }
                    }
                case None =>
                    logWarning("Worker state from unknown worker: " + workerId)
            }

        case UnregisterApplication(applicationId) =>
            logInfo(s"Received unregister request from application $applicationId")
            idToApp.get(applicationId).foreach(finishApplication)

        case CheckForWorkerTimeOut =>
            timeOutDeadWorkers()

    }

    override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
        // 处理 Worker 的注册信息
        case RegisterWorker(
        id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) =>
            logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
                workerHost, workerPort, cores, Utils.megabytesToString(memory)))
            if (state == RecoveryState.STANDBY) {
                // 给发送者回应消息.  对方的 receive 方法会收到这个信息
                context.reply(MasterInStandby)
            } else if (idToWorker.contains(id)) { // 如果要注册的 Worker 已经存在
                context.reply(RegisterWorkerFailed("Duplicate worker ID"))
            } else {
                // 根据传来的信息封装 WorkerInfo
                val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
                    workerRef, workerWebUiUrl)
                if (registerWorker(worker)) {  // 入册成功
                    persistenceEngine.addWorker(worker)
                    // 响应信息
                    context.reply(RegisteredWorker(self, masterWebUiUrl))
                    schedule()
                } else {
                    val workerAddress = worker.endpoint.address
                    logWarning("Worker registration failed. Attempted to re-register worker at same " +
                        "address: " + workerAddress)
                    context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "
                        + workerAddress))
                }
            }

        case RequestSubmitDriver(description) =>
            if (state != RecoveryState.ALIVE) {
                val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
                    "Can only accept driver submissions in ALIVE state."
                context.reply(SubmitDriverResponse(self, false, None, msg))
            } else {
                logInfo("Driver submitted " + description.command.mainClass)
                val driver = createDriver(description)
                persistenceEngine.addDriver(driver)
                waitingDrivers += driver
                drivers.add(driver)
                schedule()

                // TODO: It might be good to instead have the submission client poll the master to determine
                //       the current status of the driver. For now it's simply "fire and forget".

                context.reply(SubmitDriverResponse(self, true, Some(driver.id),
                    s"Driver successfully submitted as ${driver.id}"))
            }

        case RequestKillDriver(driverId) =>
            if (state != RecoveryState.ALIVE) {
                val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
                    s"Can only kill drivers in ALIVE state."
                context.reply(KillDriverResponse(self, driverId, success = false, msg))
            } else {
                logInfo("Asked to kill driver " + driverId)
                val driver = drivers.find(_.id == driverId)
                driver match {
                    case Some(d) =>
                        if (waitingDrivers.contains(d)) {
                            waitingDrivers -= d
                            self.send(DriverStateChanged(driverId, DriverState.KILLED, None))
                        } else {
                            // We just notify the worker to kill the driver here. The final bookkeeping occurs
                            // on the return path when the worker submits a state change back to the master
                            // to notify it that the driver was successfully killed.
                            d.worker.foreach { w =>
                                w.endpoint.send(KillDriver(driverId))
                            }
                        }
                        // TODO: It would be nice for this to be a synchronous response
                        val msg = s"Kill request for $driverId submitted"
                        logInfo(msg)
                        context.reply(KillDriverResponse(self, driverId, success = true, msg))
                    case None =>
                        val msg = s"Driver $driverId has already finished or does not exist"
                        logWarning(msg)
                        context.reply(KillDriverResponse(self, driverId, success = false, msg))
                }
            }

        case RequestDriverStatus(driverId) =>
            if (state != RecoveryState.ALIVE) {
                val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
                    "Can only request driver status in ALIVE state."
                context.reply(
                    DriverStatusResponse(found = false, None, None, None, Some(new Exception(msg))))
            } else {
                (drivers ++ completedDrivers).find(_.id == driverId) match {
                    case Some(driver) =>
                        context.reply(DriverStatusResponse(found = true, Some(driver.state),
                            driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception))
                    case None =>
                        context.reply(DriverStatusResponse(found = false, None, None, None, None))
                }
            }

        case RequestMasterState =>
            context.reply(MasterStateResponse(
                address.host, address.port, restServerBoundPort,
                workers.toArray, apps.toArray, completedApps.toArray,
                drivers.toArray, completedDrivers.toArray, state))

        case BoundPortsRequest =>
            context.reply(BoundPortsResponse(address.port, webUi.boundPort, restServerBoundPort))

        case RequestExecutors(appId, requestedTotal) =>
            context.reply(handleRequestExecutors(appId, requestedTotal))

        case KillExecutors(appId, executorIds) =>
            val formattedExecutorIds = formatExecutorIds(executorIds)
            context.reply(handleKillExecutors(appId, formattedExecutorIds))
    }

    override def onDisconnected(address: RpcAddress): Unit = {
        // The disconnected client could've been either a worker or an app; remove whichever it was
        logInfo(s"$address got disassociated, removing it.")
        addressToWorker.get(address).foreach(removeWorker)
        addressToApp.get(address).foreach(finishApplication)
        if (state == RecoveryState.RECOVERING && canCompleteRecovery) {
            completeRecovery()
        }
    }

    private def canCompleteRecovery =
        workers.count(_.state == WorkerState.UNKNOWN) == 0 &&
            apps.count(_.state == ApplicationState.UNKNOWN) == 0

    private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
                              storedWorkers: Seq[WorkerInfo]) {
        for (app <- storedApps) {
            logInfo("Trying to recover app: " + app.id)
            try {
                registerApplication(app)
                app.state = ApplicationState.UNKNOWN
                app.driver.send(MasterChanged(self, masterWebUiUrl))
            } catch {
                case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
            }
        }

        for (driver <- storedDrivers) {
            // Here we just read in the list of drivers. Any drivers associated with now-lost workers
            // will be re-launched when we detect that the worker is missing.
            drivers += driver
        }

        for (worker <- storedWorkers) {
            logInfo("Trying to recover worker: " + worker.id)
            try {
                registerWorker(worker)
                worker.state = WorkerState.UNKNOWN
                worker.endpoint.send(MasterChanged(self, masterWebUiUrl))
            } catch {
                case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
            }
        }
    }

    private def completeRecovery() {
        // Ensure "only-once" recovery semantics using a short synchronization period.
        if (state != RecoveryState.RECOVERING) {
            return
        }
        state = RecoveryState.COMPLETING_RECOVERY

        // Kill off any workers and apps that didn't respond to us.
        workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
        apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)

        // Reschedule drivers which were not claimed by any workers
        drivers.filter(_.worker.isEmpty).foreach { d =>
            logWarning(s"Driver ${d.id} was not found after master recovery")
            if (d.desc.supervise) {
                logWarning(s"Re-launching ${d.id}")
                relaunchDriver(d)
            } else {
                removeDriver(d.id, DriverState.ERROR, None)
                logWarning(s"Did not re-launch ${d.id} because it was not supervised")
            }
        }

        state = RecoveryState.ALIVE
        schedule()
        logInfo("Recovery complete - resuming operations!")
    }

    /**
      * Schedule executors to be launched on the workers.
      * Returns an array containing number of cores assigned to each worker.
      *
      * There are two modes of launching executors. The first attempts to spread out an application's
      * executors on as many workers as possible, while the second does the opposite (i.e. launch them
      * on as few workers as possible). The former is usually better for data locality purposes and is
      * the default.
      *
      * The number of cores assigned to each executor is configurable. When this is explicitly set,
      * multiple executors from the same application may be launched on the same worker if the worker
      * has enough cores and memory. Otherwise, each executor grabs all the cores available on the
      * worker by default, in which case only one executor may be launched on each worker.
      *
      * It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core
      * at a time). Consider the following example: cluster has 4 workers with 16 cores each.
      * User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is
      * allocated at a time, 12 cores from each worker would be assigned to each executor.
      * Since 12 < 16, no executors would launch [SPARK-8881].
      */
    private def scheduleExecutorsOnWorkers(
                                              app: ApplicationInfo,
                                              usableWorkers: Array[WorkerInfo],
                                              spreadOutApps: Boolean): Array[Int] = {
        val coresPerExecutor = app.desc.coresPerExecutor
        val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
        val oneExecutorPerWorker = coresPerExecutor.isEmpty
        val memoryPerExecutor = app.desc.memoryPerExecutorMB
        val numUsable = usableWorkers.length
        val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
        val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
        var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)

        /** Return whether the specified worker can launch an executor for this app. */
        def canLaunchExecutor(pos: Int): Boolean = {
            val keepScheduling = coresToAssign >= minCoresPerExecutor
            val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor

            // If we allow multiple executors per worker, then we can always launch new executors.
            // Otherwise, if there is already an executor on this worker, just give it more cores.
            val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
            if (launchingNewExecutor) {
                val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
                val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
                val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
                keepScheduling && enoughCores && enoughMemory && underLimit
            } else {
                // We're adding cores to an existing executor, so no need
                // to check memory and executor limits
                keepScheduling && enoughCores
            }
        }

        // Keep launching executors until no more workers can accommodate any
        // more executors, or if we have reached this application's limits
        var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
        while (freeWorkers.nonEmpty) {
            freeWorkers.foreach { pos =>
                var keepScheduling = true
                while (keepScheduling && canLaunchExecutor(pos)) {
                    coresToAssign -= minCoresPerExecutor
                    assignedCores(pos) += minCoresPerExecutor

                    // If we are launching one executor per worker, then every iteration assigns 1 core
                    // to the executor. Otherwise, every iteration assigns cores to a new executor.
                    if (oneExecutorPerWorker) {
                        assignedExecutors(pos) = 1
                    } else {
                        assignedExecutors(pos) += 1
                    }

                    // Spreading out an application means spreading out its executors across as
                    // many workers as possible. If we are not spreading out, then we should keep
                    // scheduling executors on this worker until we use all of its resources.
                    // Otherwise, just move on to the next worker.
                    if (spreadOutApps) {
                        keepScheduling = false
                    }
                }
            }
            freeWorkers = freeWorkers.filter(canLaunchExecutor)
        }
        assignedCores
    }

    /**
      * Schedule and launch executors on workers
      */
    private def startExecutorsOnWorkers(): Unit = {
        // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
        // in the queue, then the second app, etc.
        for (app <- waitingApps if app.coresLeft > 0) {
            val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
            // Filter out workers that don't have enough resources to launch an executor
            val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
                .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
                    worker.coresFree >= coresPerExecutor.getOrElse(1))
                .sortBy(_.coresFree).reverse
            val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

            // Now that we've decided how many cores to allocate on each worker, let's allocate them
            for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
                allocateWorkerResourceToExecutors(
                    app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
            }
        }
    }

    /**
      * Allocate a worker's resources to one or more executors.
      *
      * @param app              the info of the application which the executors belong to
      * @param assignedCores    number of cores on this worker for this application
      * @param coresPerExecutor number of cores per executor
      * @param worker           the worker info
      */
    private def allocateWorkerResourceToExecutors(
                                                     app: ApplicationInfo,
                                                     assignedCores: Int,
                                                     coresPerExecutor: Option[Int],
                                                     worker: WorkerInfo): Unit = {
        // If the number of cores per executor is specified, we divide the cores assigned
        // to this worker evenly among the executors with no remainder.
        // Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
        val numExecutors = coresPerExecutor.map {
            assignedCores / _
        }.getOrElse(1)
        val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
        for (i <- 1 to numExecutors) {
            val exec = app.addExecutor(worker, coresToAssign)
            launchExecutor(worker, exec)
            app.state = ApplicationState.RUNNING
        }
    }

    /**
      * Schedule the currently available resources among waiting apps. This method will be called
      * every time a new app joins or resource availability changes.
      */
    private def schedule(): Unit = {
        if (state != RecoveryState.ALIVE) {
            return
        }
        // Drivers take strict precedence over executors
        val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
        val numWorkersAlive = shuffledAliveWorkers.size
        var curPos = 0
        for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
            // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
            // start from the last worker that was assigned a driver, and continue onwards until we have
            // explored all alive workers.
            var launched = false
            var numWorkersVisited = 0
            while (numWorkersVisited < numWorkersAlive && !launched) {
                val worker = shuffledAliveWorkers(curPos)
                numWorkersVisited += 1
                if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
                    launchDriver(worker, driver)
                    waitingDrivers -= driver
                    launched = true
                }
                curPos = (curPos + 1) % numWorkersAlive
            }
        }
        startExecutorsOnWorkers()
    }

    private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
        logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
        worker.addExecutor(exec)
        worker.endpoint.send(LaunchExecutor(masterUrl,
            exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
        exec.application.driver.send(
            ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
    }

    private def registerWorker(worker: WorkerInfo): Boolean = {
        // There may be one or more refs to dead workers on this same node (w/ different ID's),
        // remove them.
        workers.filter { w =>
            (w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
        }.foreach { w =>
            workers -= w
        }

        val workerAddress = worker.endpoint.address
        if (addressToWorker.contains(workerAddress)) {
            val oldWorker = addressToWorker(workerAddress)
            if (oldWorker.state == WorkerState.UNKNOWN) {
                // A worker registering from UNKNOWN implies that the worker was restarted during recovery.
                // The old worker must thus be dead, so we will remove it and accept the new worker.
                removeWorker(oldWorker)
            } else {
                logInfo("Attempted to re-register worker at same address: " + workerAddress)
                return false
            }
        }

        workers += worker
        idToWorker(worker.id) = worker
        addressToWorker(workerAddress) = worker
        if (reverseProxy) {
            webUi.addProxyTargets(worker.id, worker.webUiAddress)
        }
        true
    }

    private def removeWorker(worker: WorkerInfo) {
        logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
        worker.setState(WorkerState.DEAD)
        idToWorker -= worker.id
        addressToWorker -= worker.endpoint.address
        if (reverseProxy) {
            webUi.removeProxyTargets(worker.id)
        }
        for (exec <- worker.executors.values) {
            logInfo("Telling app of lost executor: " + exec.id)
            exec.application.driver.send(ExecutorUpdated(
                exec.id, ExecutorState.LOST, Some("worker lost"), None, workerLost = true))
            exec.state = ExecutorState.LOST
            exec.application.removeExecutor(exec)
        }
        for (driver <- worker.drivers.values) {
            if (driver.desc.supervise) {
                logInfo(s"Re-launching ${driver.id}")
                relaunchDriver(driver)
            } else {
                logInfo(s"Not re-launching ${driver.id} because it was not supervised")
                removeDriver(driver.id, DriverState.ERROR, None)
            }
        }
        persistenceEngine.removeWorker(worker)
    }

    private def relaunchDriver(driver: DriverInfo) {
        driver.worker = None
        driver.state = DriverState.RELAUNCHING
        waitingDrivers += driver
        schedule()
    }

    private def createApplication(desc: ApplicationDescription, driver: RpcEndpointRef):
    ApplicationInfo = {
        val now = System.currentTimeMillis()
        val date = new Date(now)
        val appId = newApplicationId(date)
        new ApplicationInfo(now, appId, desc, date, driver, defaultCores)
    }

    private def registerApplication(app: ApplicationInfo): Unit = {
        val appAddress = app.driver.address
        if (addressToApp.contains(appAddress)) {
            logInfo("Attempted to re-register application at same address: " + appAddress)
            return
        }

        applicationMetricsSystem.registerSource(app.appSource)
        apps += app
        idToApp(app.id) = app
        endpointToApp(app.driver) = app
        addressToApp(appAddress) = app
        waitingApps += app
        if (reverseProxy) {
            webUi.addProxyTargets(app.id, app.desc.appUiUrl)
        }
    }

    private def finishApplication(app: ApplicationInfo) {
        removeApplication(app, ApplicationState.FINISHED)
    }

    def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) {
        if (apps.contains(app)) {
            logInfo("Removing app " + app.id)
            apps -= app
            idToApp -= app.id
            endpointToApp -= app.driver
            addressToApp -= app.driver.address
            if (reverseProxy) {
                webUi.removeProxyTargets(app.id)
            }
            if (completedApps.size >= RETAINED_APPLICATIONS) {
                val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
                completedApps.take(toRemove).foreach { a =>
                    applicationMetricsSystem.removeSource(a.appSource)
                }
                completedApps.trimStart(toRemove)
            }
            completedApps += app // Remember it in our history
            waitingApps -= app

            for (exec <- app.executors.values) {
                killExecutor(exec)
            }
            app.markFinished(state)
            if (state != ApplicationState.FINISHED) {
                app.driver.send(ApplicationRemoved(state.toString))
            }
            persistenceEngine.removeApplication(app)
            schedule()

            // Tell all workers that the application has finished, so they can clean up any app state.
            workers.foreach { w =>
                w.endpoint.send(ApplicationFinished(app.id))
            }
        }
    }

    /**
      * Handle a request to set the target number of executors for this application.
      *
      * If the executor limit is adjusted upwards, new executors will be launched provided
      * that there are workers with sufficient resources. If it is adjusted downwards, however,
      * we do not kill existing executors until we explicitly receive a kill request.
      *
      * @return whether the application has previously registered with this Master.
      */
    private def handleRequestExecutors(appId: String, requestedTotal: Int): Boolean = {
        idToApp.get(appId) match {
            case Some(appInfo) =>
                logInfo(s"Application $appId requested to set total executors to $requestedTotal.")
                appInfo.executorLimit = requestedTotal
                schedule()
                true
            case None =>
                logWarning(s"Unknown application $appId requested $requestedTotal total executors.")
                false
        }
    }

    /**
      * Handle a kill request from the given application.
      *
      * This method assumes the executor limit has already been adjusted downwards through
      * a separate [[RequestExecutors]] message, such that we do not launch new executors
      * immediately after the old ones are removed.
      *
      * @return whether the application has previously registered with this Master.
      */
    private def handleKillExecutors(appId: String, executorIds: Seq[Int]): Boolean = {
        idToApp.get(appId) match {
            case Some(appInfo) =>
                logInfo(s"Application $appId requests to kill executors: " + executorIds.mkString(", "))
                val (known, unknown) = executorIds.partition(appInfo.executors.contains)
                known.foreach { executorId =>
                    val desc = appInfo.executors(executorId)
                    appInfo.removeExecutor(desc)
                    killExecutor(desc)
                }
                if (unknown.nonEmpty) {
                    logWarning(s"Application $appId attempted to kill non-existent executors: "
                        + unknown.mkString(", "))
                }
                schedule()
                true
            case None =>
                logWarning(s"Unregistered application $appId requested us to kill executors!")
                false
        }
    }

    /**
      * Cast the given executor IDs to integers and filter out the ones that fail.
      *
      * All executors IDs should be integers since we launched these executors. However,
      * the kill interface on the driver side accepts arbitrary strings, so we need to
      * handle non-integer executor IDs just to be safe.
      */
    private def formatExecutorIds(executorIds: Seq[String]): Seq[Int] = {
        executorIds.flatMap { executorId =>
            try {
                Some(executorId.toInt)
            } catch {
                case e: NumberFormatException =>
                    logError(s"Encountered executor with a non-integer ID: $executorId. Ignoring")
                    None
            }
        }
    }

    /**
      * Ask the worker on which the specified executor is launched to kill the executor.
      */
    private def killExecutor(exec: ExecutorDesc): Unit = {
        exec.worker.removeExecutor(exec)
        exec.worker.endpoint.send(KillExecutor(masterUrl, exec.application.id, exec.id))
        exec.state = ExecutorState.KILLED
    }

    /** Generate a new app ID given an app's submission date */
    private def newApplicationId(submitDate: Date): String = {
        val appId = "app-%s-%04d".format(createDateFormat.format(submitDate), nextAppNumber)
        nextAppNumber += 1
        appId
    }

    /** Check for, and remove, any timed-out workers */
    private def timeOutDeadWorkers() {
        // Copy the workers into an array so we don't modify the hashset while iterating through it
        val currentTime = System.currentTimeMillis()
        val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray
        for (worker <- toRemove) {
            if (worker.state != WorkerState.DEAD) {
                logWarning("Removing %s because we got no heartbeat in %d seconds".format(
                    worker.id, WORKER_TIMEOUT_MS / 1000))
                removeWorker(worker)
            } else {
                // 确认是否死亡 = worker的最后心跳时间小于当前时间-（worket死亡迭代次数+1）* worker的超时时间
                if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)) {
                    workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
                }
            }
        }
    }

    private def newDriverId(submitDate: Date): String = {
        val appId = "driver-%s-%04d".format(createDateFormat.format(submitDate), nextDriverNumber)
        nextDriverNumber += 1
        appId
    }

    private def createDriver(desc: DriverDescription): DriverInfo = {
        val now = System.currentTimeMillis()
        val date = new Date(now)
        new DriverInfo(now, newDriverId(date), desc, date)
    }

    private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
        logInfo("Launching driver " + driver.id + " on worker " + worker.id)
        worker.addDriver(driver)
        driver.worker = Some(worker)
        worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
        driver.state = DriverState.RUNNING
    }

    private def removeDriver(
                                driverId: String,
                                finalState: DriverState,
                                exception: Option[Exception]) {
        drivers.find(d => d.id == driverId) match {
            case Some(driver) =>
                logInfo(s"Removing driver: $driverId")
                drivers -= driver
                if (completedDrivers.size >= RETAINED_DRIVERS) {
                    val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
                    completedDrivers.trimStart(toRemove)
                }
                completedDrivers += driver
                persistenceEngine.removeDriver(driver)
                driver.state = finalState
                driver.exception = exception
                driver.worker.foreach(w => w.removeDriver(driver))
                schedule()
            case None =>
                logWarning(s"Asked to remove unknown driver: $driverId")
        }
    }
}

/*
  1、构建参数
  2、启动 RPC 通信环境和 Master 通信终端
    2.1、构建 RPC 环境
    2.2、构建 Master 通信终端，返回 Master 通信终端引用
    2.3、向 Master 终端发送请求，获取 BoundPortsResponse 对象
    2.4、返回一个三元组，（ rpcEnv  : rpc环境, web ui port : web ui 请求端口, Rest port : Rest 的监控端口）
 */
private[deploy] object Master extends Logging {
    val SYSTEM_NAME = "sparkMaster"
    val ENDPOINT_NAME = "Master"

    // 启动 Master 的入口函数
    def main(argStrings: Array[String]) {
        Utils.initDaemon(log)
        val conf = new SparkConf
        // 构建用于参数解析的实例   --host hadoop201 --port 7077 --webui-port 8080
        val args = new MasterArguments(argStrings, conf)
        // 启动 RPC 通信环境和 MasterEndPoint(通信终端)
        val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
        rpcEnv.awaitTermination()
    }

    /**
      * Start the Master and return a three tuple of:
      * 启动 Master 并返回一个三元组
      * (1) The Master RpcEnv
      * (2) The web UI bound port
      * (3) The REST server bound port, if any
      */
    def startRpcEnvAndEndpoint(
                                  host: String,
                                  port: Int,
                                  webUiPort: Int,
                                  conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
        val securityMgr = new SecurityManager(conf)
        // 创建 Master 端的 RpcEnv 环境   参数: sparkMaster hadoop201 7077 conf securityMgr
        // 实际类型是: NettyRpcEnv
        val rpcEnv: RpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
        // 创建 Master对象, 该对象就是一个 RpcEndpoint, 在 RpcEnv中注册这个RpcEndpoint
        // 返回该 RpcEndpoint 的引用, 使用该引用来接收信息和发送信息
        val masterEndpoint: RpcEndpointRef = rpcEnv.setupEndpoint(ENDPOINT_NAME,
            new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
        // 向 Master 的通信终端发法请求，获取 BoundPortsResponse 对象
        // BoundPortsResponse 是一个样例类包含三个属性: rpcEndpointPort webUIPort(web ui监听端口号) restPort(REST的监听端口号)
        val portsResponse: BoundPortsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
        (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
    }
}
